package rpc

import (
	"encoding/json"
	"fmt"
	"net"
	"os"
	"sync"
	"sync/atomic"
	"time"

	"github.com/steveyegge/beads/internal/storage"
)

// ServerVersion is the version of this RPC server
// This should match the bd CLI version for proper compatibility checks
// It's set dynamically by daemon.go from cmd/bd/version.go before starting the server
var ServerVersion = "0.0.0" // Placeholder; overridden by daemon startup

const (
	statusUnhealthy = "unhealthy"
)

// Server represents the RPC server that runs in the daemon
type Server struct {
	socketPath    string
	workspacePath string          // Absolute path to workspace root
	dbPath        string          // Absolute path to database file
	storage       storage.Storage // Default storage (for backward compat)
	listener      net.Listener
	mu            sync.RWMutex
	shutdown      bool
	shutdownChan  chan struct{}
	stopOnce      sync.Once
	doneChan      chan struct{} // closed when Start() cleanup is complete
	// Health and metrics
	startTime        time.Time
	lastActivityTime atomic.Value // time.Time - last request timestamp
	metrics          *Metrics
	// Connection limiting
	maxConns      int
	activeConns   int32 // atomic counter
	connSemaphore chan struct{}
	// Request timeout
	requestTimeout time.Duration
	// Ready channel signals when server is listening
	readyChan chan struct{}
	// Auto-import single-flight guard
	importInProgress atomic.Bool
	// Mutation events for event-driven daemon
	mutationChan    chan MutationEvent
	droppedEvents   atomic.Int64 // Counter for dropped mutation events
	// Recent mutations buffer for polling (circular buffer, max 100 events)
	recentMutations   []MutationEvent
	recentMutationsMu sync.RWMutex
	maxMutationBuffer int
}

// Mutation event types
const (
	MutationCreate  = "create"
	MutationUpdate  = "update"
	MutationDelete  = "delete"
	MutationComment = "comment"
)

// MutationEvent represents a database mutation for event-driven sync
type MutationEvent struct {
	Type      string    // One of: MutationCreate, MutationUpdate, MutationDelete, MutationComment
	IssueID   string    // e.g., "bd-42"
	Timestamp time.Time
}

// NewServer creates a new RPC server
func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server {
	// Parse config from env vars
	maxConns := 100 // default
	if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" {
		var conns int
		if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 {
			maxConns = conns
		}
	}

	requestTimeout := 30 * time.Second // default
	if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" {
		if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 {
			requestTimeout = timeout
		}
	}

	mutationBufferSize := 512 // default (increased from 100 for better burst handling)
	if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" {
		var bufSize int
		if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 {
			mutationBufferSize = bufSize
		}
	}

	s := &Server{
		socketPath:        socketPath,
		workspacePath:     workspacePath,
		dbPath:            dbPath,
		storage:           store,
		shutdownChan:      make(chan struct{}),
		doneChan:          make(chan struct{}),
		startTime:         time.Now(),
		metrics:           NewMetrics(),
		maxConns:          maxConns,
		connSemaphore:     make(chan struct{}, maxConns),
		requestTimeout:    requestTimeout,
		readyChan:         make(chan struct{}),
		mutationChan:      make(chan MutationEvent, mutationBufferSize), // Configurable buffer
		recentMutations:   make([]MutationEvent, 0, 100),
		maxMutationBuffer: 100,
	}
	s.lastActivityTime.Store(time.Now())
	return s
}

// emitMutation sends a mutation event to the daemon's event-driven loop.
// Non-blocking: drops event if channel is full (sync will happen eventually).
// Also stores in recent mutations buffer for polling.
func (s *Server) emitMutation(eventType, issueID string) {
	event := MutationEvent{
		Type:      eventType,
		IssueID:   issueID,
		Timestamp: time.Now(),
	}

	// Send to mutation channel for daemon
	select {
	case s.mutationChan <- event:
		// Event sent successfully
	default:
		// Channel full, increment dropped events counter
		s.droppedEvents.Add(1)
	}

	// Store in recent mutations buffer for polling
	s.recentMutationsMu.Lock()
	s.recentMutations = append(s.recentMutations, event)
	// Keep buffer size limited (circular buffer behavior)
	if len(s.recentMutations) > s.maxMutationBuffer {
		s.recentMutations = s.recentMutations[1:]
	}
	s.recentMutationsMu.Unlock()
}

// MutationChan returns the mutation event channel for the daemon to consume
func (s *Server) MutationChan() <-chan MutationEvent {
	return s.mutationChan
}

// ResetDroppedEventsCount resets the dropped events counter and returns the previous value
func (s *Server) ResetDroppedEventsCount() int64 {
	return s.droppedEvents.Swap(0)
}

// GetRecentMutations returns mutations since the given timestamp
func (s *Server) GetRecentMutations(sinceMillis int64) []MutationEvent {
	s.recentMutationsMu.RLock()
	defer s.recentMutationsMu.RUnlock()

	var result []MutationEvent
	for _, m := range s.recentMutations {
		if m.Timestamp.UnixMilli() > sinceMillis {
			result = append(result, m)
		}
	}
	return result
}

// handleGetMutations handles the get_mutations RPC operation
func (s *Server) handleGetMutations(req *Request) Response {
	var args GetMutationsArgs
	if err := json.Unmarshal(req.Args, &args); err != nil {
		return Response{
			Success: false,
			Error:   fmt.Sprintf("invalid arguments: %v", err),
		}
	}

	mutations := s.GetRecentMutations(args.Since)
	data, _ := json.Marshal(mutations)

	return Response{
		Success: true,
		Data:    data,
	}
}
